package org.example.consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Properties;

@Component
public class KafkaConsumerBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerBuilder.class);
    private static final String GROUP_ID = "kafka.consumer.group-id";
    private static final String BOOTSTRAP_SERVERS = "kafka.consumer.bootstrap-servers";
    private static final String TOPIC = "kafka.consumer.topic";
    private static final String AUTO_COMMIT = "kafka.consumer.auto-commit";
    private static final String COMMIT_INTERVAL = "kafka.consumer.commit-interval";
    @Autowired
    private Environment env;

    public KafkaConsumer<String, String> create() {
        Properties props = new Properties();
        props.setProperty("group.id", env.getProperty(GROUP_ID));
        // props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.setProperty("bootstrap.servers", env.getProperty(BOOTSTRAP_SERVERS));
        props.setProperty("enable.auto.commit", env.getProperty(AUTO_COMMIT));
        props.setProperty("auto.commit.interval.ms", env.getProperty(COMMIT_INTERVAL));
        // 通过配置支持其他的序列化类型要修改代码, 先用JsonStr吧.
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(env.getProperty(TOPIC)));

        return consumer;
    }
}
